package cn.uncode.rpc.transport.support;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.exception.TransportException;
import cn.uncode.rpc.spi.ExtensionLoader;
import cn.uncode.rpc.transport.ChannelFactory;
import cn.uncode.rpc.transport.Client;
import cn.uncode.rpc.transport.HeartBeatManager;
import cn.uncode.rpc.transport.HeartbeatFactory;
import cn.uncode.rpc.transport.RequestHandler;
import cn.uncode.rpc.transport.Server;
import cn.uncode.rpc.util.FrameworkUtil;


/**
 * 
 * abstract endpoint factory
 * 
 * <pre>
 * 		一些约定：
 * 
 * 		1） service :
 * 			1.1） not share channel :  某个service暴露服务的时候，不期望和别的service共享服务，明哲自保，比如你说：我很重要，我很重要。
 * 
 * 			1.2） share channel ： 某个service 暴露服务的时候，如果有某个模块，但是拆成10个接口，可以使用这种方式，不过有一些约束条件：接口的几个serviceConfig配置需要保持一致。
 * 				
 * 				不允许差异化的配置如下：
 * 					protocol, codec , serialize, maxContentLength , maxServerConnection , maxWorkerThread, workerQueueSize, heartbeatFactory
 * 				
 * 		2）心跳机制：
 * 
 * 			不同的protocol的心跳包格式可能不一样，无法进行强制，那么通过可扩展的方式，依赖heartbeatFactory进行heartbeat包的创建，
 * 			同时对于service的messageHandler进行wrap heartbeat包的处理。 
 * 
 * 			对于service来说，把心跳包当成普通的request处理，因为这种heartbeat才能够探测到整个service处理的关键路径的可用状况
 * 
 * </pre>
 * 
 */
public abstract class AbstractChannelFactory implements ChannelFactory {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelFactory.class);
	
	/** 维持share channel 的service列表 **/
    protected Map<String, Server> ipPort2ServerShareChannel = new HashMap<String, Server>();
    protected ConcurrentMap<Server, Set<String>> server2UrlsShareChannel = new ConcurrentHashMap<Server, Set<String>>();

    private HeartBeatManager heartbeatManager = null;
    
    public AbstractChannelFactory() {
    	heartbeatManager = new DefaultHeartBeatManager();
    	heartbeatManager.init();
    }

	@Override
	public Server createServer(URL url, RequestHandler requestHandler) {

        HeartbeatFactory heartbeatFactory = getHeartbeatFactory(url);
        requestHandler = heartbeatFactory.wrapRequestHandler(requestHandler);

        synchronized (ipPort2ServerShareChannel) {
            String ipPort = url.getServerPortStr();
            String protocolKey = FrameworkUtil.getProtocolKey(url);

            boolean shareChannel = url.getBooleanParameter(URLParam.SHARE_CHANNEL.getName(), URLParam.SHARE_CHANNEL.getBooleanValue());
            if (!shareChannel) { // 独享一个端口
            	LOGGER.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);
                // 如果端口已经被使用了，使用该server bind 会有异常
                return innerCreateServer(url, requestHandler);
            }

            LOGGER.info(this.getClass().getSimpleName() + " create share_channel server: url={}", url);

            Server server = ipPort2ServerShareChannel.get(ipPort);
            if (server != null) {
                // can't share service channel
                if (!FrameworkUtil.checkIfCanShallServiceChannel(server.getUrl(), url)) {
                    throw new TransportException(
                            "Service export Error: share channel but some config param is different, protocol or codec or serialize or maxContentLength or maxServerConnection or maxWorkerThread or heartbeatFactory, source="
                                    + server.getUrl() + " target=" + url);
                }
                saveServer2Urls(server2UrlsShareChannel, server, protocolKey);
                return server;
            }

            url = url.createCopy();
            url.setPath(""); // 共享server端口，由于有多个interfaces存在，所以把path设置为空

            server = innerCreateServer(url, requestHandler);
            ipPort2ServerShareChannel.put(ipPort, server);
            saveServer2Urls(server2UrlsShareChannel, server, protocolKey);

            return server;
        }
    
	}
	
	
    private <T> void saveServer2Urls(ConcurrentMap<T, Set<String>> map, T server, String namespace) {
        Set<String> sets = map.get(server);

        if (sets == null) {
            sets = new HashSet<String>();
            sets.add(namespace);
            map.putIfAbsent(server, sets); // 规避并发问题，因为有release逻辑存在，所以这里的sets预先add了namespace
            sets = map.get(server);
        }

        sets.add(namespace);
    }
	
	
	private HeartbeatFactory getHeartbeatFactory(URL url) {
		String heartbeatFactoryName = url.getParameter(URLParam.HEARTBEAT_FACTORY.getName(), URLParam.HEARTBEAT_FACTORY.getValue());
		HeartbeatFactory heartbeatFactory = ExtensionLoader.getExtensionLoader(HeartbeatFactory.class).getExtension(heartbeatFactoryName);
		if (heartbeatFactory == null) {
			throw new TransportException("HeartbeatFactory not exist: " + heartbeatFactoryName);
		}
		return heartbeatFactory;
	}

	@Override
	public Client createClient(URL url) {
		LOGGER.info(this.getClass().getSimpleName() + " create client: url={}", url);
        return createClient(url, heartbeatManager);
	}
	
	private Client createClient(URL url, HeartBeatManager heartBeatManager) {
        Client client = innerCreateClient(url);
        heartBeatManager.addClient(client);
        return client;
    }


	@Override
	public void safeReleaseResource(Server server, URL url) {
		safeReleaseResource(server, url, ipPort2ServerShareChannel, server2UrlsShareChannel);
	}
	
	
    private <T extends Server> void safeReleaseResource(T server, URL url, Map<String, T> ipPort2Endpoint,
            ConcurrentMap<T, Set<String>> endpoint2Urls) {
        boolean shareChannel = url.getBooleanParameter(URLParam.REFRESH_TIMESTAMP.getName(), URLParam.REFRESH_TIMESTAMP.getBooleanValue());

        if (!shareChannel) {
            server.stop();
            return;
        }

        synchronized (ipPort2Endpoint) {
            String ipPort = url.getServerPortStr();
            String protocolKey = FrameworkUtil.getProtocolKey(url);

            if (server != ipPort2Endpoint.get(ipPort)) {
            	server.stop();
                return;
            }

            Set<String> urls = endpoint2Urls.get(server);
            urls.remove(protocolKey);

            if (urls.isEmpty()) {
            	server.stop();
                ipPort2Endpoint.remove(ipPort);
                endpoint2Urls.remove(server);
            }
        }
    }

	@Override
	public void safeReleaseResource(Client client, URL url) {
		destory(client);
	}
	
	private <T extends Client> void destory(T client) {
		client.close();
		heartbeatManager.removeClient(client);
	}
	
    protected abstract Server innerCreateServer(URL url, RequestHandler requestHandler);

    protected abstract Client innerCreateClient(URL url);

}
